package com.atguigu.flink.sql.join;

import com.atguigu.flink.function.WaterSensorMapFunction;
import com.atguigu.flink.pojo.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

/**
 *
 *      常规Join(不开窗之类)
 *          inner join，left out join
 *
 *          https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/table/sql/queries/joins/
 */
public class Demo1_CommonJoin
{
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);

        //配置对象
        TableConfig config = tableEnvironment.getConfig();
        //状态的存活时间
        config.setIdleStateRetention(Duration.ofSeconds(10));
        //config.set("table.exec.state.ttl",10 * 1000 +"");


        env.setParallelism(1);


        SingleOutputStreamOperator<WaterSensor> ds1 = env
            .socketTextStream("hadoop102", 8888)
            .map(new WaterSensorMapFunction());

        SingleOutputStreamOperator<WaterSensor> ds2 = env
            .socketTextStream("hadoop102", 8889)
            .map(new WaterSensorMapFunction());


        tableEnvironment.createTemporaryView("t1",ds1);
        tableEnvironment.createTemporaryView("t2",ds2);

        /*
            inner join: 每关联一条，向结果表总增加一条 +I 数据。 可以使用普通的kafka连接器写出

            join的时候，两张的数据，默认会缓存在状态中。

            left join: 默认取左表数据的全部。如果一开始左表的数据没有和右表关联上，此时向表中新增一条+I的数据。
                    一旦右表来了数据，关联成功了，此时会删除之前关联的结果，新增一条新的结果。

                    left join会产生 -D的数据，只能用 upsert kafka去写出。

             ---------------------
                设置了状态的存活时间后，inner join左右两侧的表都遵守存储时间。超过存活时间没有被访问，数据会在状态中被删除。
                设置了状态的存活时间后，left join左侧的表每次被访问后会重置存活时间。
                                    右侧的表超过存活时间，数据会在状态中被删除。
         */
        Table table = tableEnvironment.sqlQuery("select t1.id,t1.vc, t2.id,t2.vc" +
            "  from t1 left join t2 " +
            " on t1.id = t2.id ");
        table
                            .execute().print();


        env.execute();

    }
}
